gRPC的metadata

96次阅读

共计 3936 个字符,预计需要花费 10 分钟才能阅读完成。

概念

metadata 是以 key-value 的形式存储数据的,其中 key 是字符串类型,value 是字符串数组类型,类似于 http 请求中的 header

文档

https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md

新建 metadata

// 使用 metadata.New 创建
md := metadata.New(map[string]string{"key": "value"})
// 使用 metadata.New 创建
md := metadata.Pairs(
    "key1", "value1",
    "key1", "value2",
    "key2", "value2",
)

MD 本质上是一个 map[string][]string 类型。

客户端发送 metadata

使用 metadata.NewOutgoingContext 设置。

// 设置 metadata
md := metadata.New(map[string]string{"origin": "client"})
ctx := metadata.NewOutgoingContext(context.Background(), md)

服务端接收 metadata

使用 metadata.FromIncomingContext 设置。

// 接收 metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {fmt.Println("server:", md["origin"])
}

简单模式下 metadata 示例

proto 文件

syntax = "proto3";

package proto;

option go_package = "/cal;cal";

message RequestInfo {
  int64 number1 = 1;
  int64 number2 = 2;
}

message ResponseInfo {int64 res = 1;}

service Cal {rpc Add (RequestInfo) returns (ResponseInfo) {}}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    "test/cal"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := cal.NewCalClient(conn)

    // 设置 metadata
    md := metadata.New(map[string]string{"origin": "client"})
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // 调用服务
    res, _ := client.Add(ctx, &cal.RequestInfo{
        Number1: 1,
        Number2: 1,
    })
    fmt.Println(res.Res)
}

服务端

package main

import (
    "fmt"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    "net"
    "test/cal"
)

type Cal struct {cal.UnimplementedCalServer}

func (c *Cal) Add(ctx context.Context, req *cal.RequestInfo) (*cal.ResponseInfo, error) {
    // 接收 metadata
    if md, ok := metadata.FromIncomingContext(ctx); ok {fmt.Println(md["origin"])
    }
    return &cal.ResponseInfo{Res: req.Number1 + req.Number2}, nil
}

func main() {fmt.Println("start")
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化 grpc 服务
    s := grpc.NewServer()
    // 注册服务
    cal.RegisterCalServer(s, &Cal{})
    // 启动
    s.Serve(listen)
}

流模式下 metadata 实例

proto 文件

syntax = "proto3";

package proto;

option go_package = "/stream;stream";

message RequestInfo {string data = 1;}

message ResponseInfo {string data = 1;}

service Stream {rpc AllStream (stream RequestInfo) returns (stream ResponseInfo) {}}

客户端

流模式下客户端接收服务端的 metadata 使用 Header() 方法。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    "sync"
    "test/stream"
    "time"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    wg := sync.WaitGroup{}
    wg.Add(2)
    // 设置请求的 metadata
    md := metadata.New(map[string]string{"origin": "client",})
    ctx := metadata.NewOutgoingContext(context.Background(), md)
    all, _ := client.AllStream(ctx)
    go func() {defer wg.Done()
        for {if res, err := all.Recv(); err != nil {fmt.Println(err)
                break
            } else {
                // 打印服务端的 metadata
                header, _ := all.Header()
                fmt.Println("header", header)
                fmt.Println(res.Data)
            }
        }
    }()
    go func() {defer wg.Done()
        for i := 0; i < 10; i++ {
            _ = all.Send(&stream.RequestInfo{Data: fmt.Sprintf(" 客户端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()}

服务端

流模式下服务端发送 metadata 使用 SetHeader() 方法。

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    "net"
    "sync"
    "test/stream"
    "time"
)

type Stream struct {stream.UnimplementedStreamServer}

func (s *Stream) AllStream(all stream.Stream_AllStreamServer) error {
    // 打印客户端的 metadata
    if md, ok := metadata.FromIncomingContext(all.Context()); ok {fmt.Println(md["origin"])
    }
    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {defer wg.Done()
        for {if res, err := all.Recv(); err != nil {fmt.Println(err)
                break
            } else {fmt.Println(res.Data)
            }
        }
    }()
    go func() {defer wg.Done()
        // 设置服务端的 metadata
        md := metadata.New(map[string]string{"origin": "server",})
        all.SetHeader(md)
        for i := 0; i < 10; i++ {
            _ = all.Send(&stream.ResponseInfo{Data: fmt.Sprintf(" 服务端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
    return nil
}

func main() {fmt.Println("start")
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化 grpc 服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}
正文完
 
dkp
版权声明:本站原创文章,由 dkp 2024-01-05发表,共计3936字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。